package edu.nepu.flink.api.sql;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Date 2024/3/5 21:10
 * @Created by chenshuaijun
 */
public class StreamToTable {

    public static void main(String[] args) throws Exception {

        // 首先介绍的是创建表的环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        Table table = tableEnv.fromDataStream(streamSource, $("id"));
        tableEnv.createTemporaryView("numbers",table);

        tableEnv.sqlQuery("select id from numbers").execute().print();
    }
}
